001 /* 002 * CondorGLSFDispatcher.java 003 * 004 * Created on July 17, 2003, 11:17 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.condorg; 024 025 import gov.bnl.star.offline.scheduler.*; 026 import gov.bnl.star.offline.scheduler.request.Request; 027 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication; 028 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher; 029 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 030 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 031 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH 032 import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition; 033 034 import java.io.File; 035 import java.io.FileOutputStream; 036 import java.io.PrintStream; 037 import java.util.*; 038 039 import java.util.logging.Level; 040 import java.util.logging.Logger; 041 042 043 /** Dispatches jobs using Condor-G on a remote site that uses LSF. It will use some 044 * extra rsl attributes created to command some extra features such as mail 045 * notification, resource usage, job name and target machine. These extra LSF 046 * attribute require a patch to the LSF job manager. 047 * @author Gabriele Carcassi 048 * @version 1.0 2003/07/23 049 */ 050 public class CondorGLSFDispatcher extends LSFDispatcher { 051 static private Logger log = Logger.getLogger(CondorGLSFDispatcher.class.getName()); 052 053 private static String condorEx; 054 protected CSHApplication application; 055 056 private String ResReqDefinitionObj; 057 058 public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){ 059 this.ResReqDefinitionObj = ResReqDefinitionObj; 060 061 } 062 063 public void setCondorEx(String condorEx) { 064 this.condorEx = condorEx; 065 } 066 067 public String getCondorEx() { 068 return condorEx; 069 } 070 071 /** Creates a new dispatcher */ 072 public CondorGLSFDispatcher() { 073 } 074 075 /** Creates the scripts and dispatches the job on the target machine. 076 * @param request the job request 077 */ 078 public void dispatch(Request request, List jobs) { 079 log.info("Dispatching using Condor-g and LSF: \"" + request.getCommand() + 080 "\""); 081 082 // Enables the simulation mode if necessary 083 useSimulationMode(request.getSimulation()); 084 reportedFailure = false; 085 086 // Submits from the higher to the lower JobID. This way the 087 // user has a feel of when the last job is going to be 088 // submitted 089 for (int nProcess = jobs.size() - 1; nProcess >= 0; 090 nProcess--) { 091 Job job = (Job) jobs.get(nProcess); 092 093 System.out.print("Dispatching process " + 094 job.getJobID() + "."); 095 dispatch(request, job); 096 } 097 098 //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH 099 } 100 101 102 public void setApplication(CSHApplication application){ 103 this.application = application; 104 } 105 106 107 108 public CSHApplication getApplication(){ 109 return application; 110 } 111 112 113 protected void dispatch(Request request, Job job) { 114 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 115 116 //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 117 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 118 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 119 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 120 log.warning(notSet); 121 System.out.println(notSet); 122 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 123 } 124 125 126 // TODO: all the parameters should be passed in one go 127 application.setJob(request, job); 128 application.setScratchDir(scratchDir); 129 application.setSubmissionCommand(getCondorGCommand(request, job)); 130 131 application.prepareJob(); 132 prepareClassAd(request, job); 133 134 log.info("Executing \"" + getCondorGCommand(request, job) + "\""); 135 136 if (!simulation) { 137 try { 138 Thread.sleep(getMsBtwnSuccess()); 139 } catch (Exception e) { 140 } 141 142 long StarTime = System.currentTimeMillis(); 143 int attempt = 0; 144 boolean success = false; 145 146 while (!success && (attempt < getMaxAttempts())) { 147 try { 148 CSHCommandLineTask task = new CSHCommandLineTask(getCondorGCommand(request, job), true, 30000); 149 task.execute(); 150 151 if (task.getExitStatus() != 0) { 152 log.warning("bsub failed: " + task.getOutput()); 153 Thread.sleep(getMsBtwnFailure()); 154 System.out.print("/"); 155 attempt++; 156 } else { 157 success = true; 158 job.DispatchSuccessful(); 159 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim()); 160 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 161 } 162 } catch (Exception e) { 163 log.log(Level.SEVERE, 164 "Couldn't submit the script to Condor-g", e); 165 166 try { 167 Thread.sleep(getMsBtwnFailure()); 168 } catch (Exception e1) { 169 } 170 171 System.out.print("/"); 172 attempt++; 173 } 174 } 175 176 if (success) { 177 System.out.println(" done."); 178 } else { 179 System.out.println(" FAILED!!"); 180 } 181 } else { 182 System.out.println(" simulated."); 183 } 184 } 185 186 /** Returns the command line to submit the job through condor-g. 187 * @param request the request that originated the job 188 * @param job the job to be dispatched 189 * @return the commandline to submit the job 190 */ 191 protected String getCondorGCommand(Request request, Job job) { 192 return condorEx + " " + getClassAdName(request, job); 193 } 194 195 /** Returns the name of the file containing the class ad. Class ad is the job 196 * description required by condor to submit a job. 197 * @param request the request that originated the job 198 * @param job the job to be submitted 199 * @return the file name of the class ad 200 */ 201 protected String getClassAdName(Request request, Job job) { 202 return "sched" + job.getJobID() + ".condorg"; 203 } 204 205 private void prepareClassAd(Request request, Job job) { 206 try { 207 PrintStream classAd = new PrintStream(new FileOutputStream( 208 new File(getClassAdName(request, job)))); 209 createClassAd(request, job, classAd); 210 } catch (Exception e) { 211 log.log(Level.SEVERE, "Couldn't create the class ad", e); 212 throw new RuntimeException("Couldn't create the class ad " + 213 getClassAdName(request, job) + ": " + e.getMessage()); 214 } 215 } 216 217 private void createClassAd(Request request, Job job, 218 PrintStream classAd) { 219 classAd.print("executable = "); 220 classAd.println(getExecutable()); 221 222 if (getArguments() != null) { 223 classAd.print("arguments = "); 224 classAd.println(getArguments()); 225 } 226 227 classAd.print("globusscheduler = "); 228 classAd.println(getGlobusScheduler()); 229 230 if (application.getStdin() != null) { 231 classAd.print("input = "); 232 classAd.println(application.getStdin()); 233 } 234 235 if (application.getStdout() != null) { 236 classAd.print("output = "); 237 classAd.println(application.getStdout()); 238 } 239 240 if (application.getStderr() != null) { 241 classAd.print("error = "); 242 classAd.println(application.getStderr()); 243 } 244 245 classAd.print("log = "); 246 classAd.println(getLogName(job)); 247 248 if (getRemoteDirectory() != null) { 249 classAd.print("remote_initialdir = "); 250 classAd.println(getRemoteDirectory()); 251 } 252 253 classAd.print("globusrsl ="); 254 255 if (job.getTarget() != null) { 256 classAd.print(" (xlsfmachine = "); 257 classAd.print(job.getTarget()); 258 classAd.print(")"); 259 } 260 261 if (application.getJobName() != null) { 262 classAd.print(" (xlsfjobname = "); 263 classAd.print(application.getJobName()); 264 classAd.print(")"); 265 } 266 267 if (request.getMail()) { 268 classAd.print(" (xlsfmailreport = "); 269 classAd.print("false"); 270 classAd.print(")"); 271 } else { 272 classAd.print(" (xlsfmailreport = "); 273 classAd.print("true"); 274 classAd.print(")"); 275 } 276 ////////////lbh 277 278 GenericResourceRequirementStringDefinition lsfResReqDef = new GenericResourceRequirementStringDefinition(); 279 if(ResReqDefinitionObj != null) 280 lsfResReqDef = (GenericResourceRequirementStringDefinition) ComponentLibrary.getInstance().getComponent(ResReqDefinitionObj); 281 282 if ((getResourceUsageSwitch(job) != null)&&( lsfResReqDef.hasResourcesDefinition(job))) { 283 284 String SD = "rusage" + getResourceUsageSwitch(job).subSequence(getResourceUsageSwitch(job).indexOf("["),getResourceUsageSwitch(job).indexOf("]")).toString() + "]"; 285 String Res = "\\\"( " + lsfResReqDef.makeString(job).replaceAll("\\\"", "").concat(" ) ").concat(SD).concat("\\\""); 286 classAd.print(" (xlsfresources = "); 287 classAd.print(Res); 288 classAd.print(")"); 289 } 290 else if(getResourceUsageSwitch(job) != null){ 291 classAd.print(" (xlsfresources = "); 292 classAd.print(getResourceUsageSwitch(job)); 293 classAd.print(")"); 294 } 295 else if( lsfResReqDef.hasResourcesDefinition(job)){ 296 classAd.print(" (xlsfresources = "); 297 classAd.print(lsfResReqDef.makeString(job)); 298 classAd.print(")"); 299 } 300 301 302 // if (getResourceUsageSwitch(job) != null) { 303 // classAd.print(" (xlsfresources = "); 304 // classAd.print(getResourceUsageSwitch(job)); 305 // classAd.print(")"); 306 // } 307 308 if (job.getQueue() != null) { 309 classAd.print(" (queue = "); 310 classAd.print(job.getQueue()); 311 classAd.print(")"); 312 } 313 314 classAd.println(); 315 316 if (isTransferExecutable()) { 317 classAd.println("transfer_executable = true"); 318 } else { 319 classAd.println("transfer_executable = false"); 320 } 321 classAd.println("notification = never"); 322 classAd.println("universe = globus"); 323 classAd.println("queue"); 324 } 325 326 private String getExecutable() { 327 if (application.getCommandLine().indexOf(' ') == -1) { 328 return application.getCommandLine(); 329 } 330 331 return application.getCommandLine().substring(0, 332 application.getCommandLine().indexOf(' ')); 333 } 334 335 private String getArguments() { 336 if (application.getCommandLine().indexOf(' ') == -1) { 337 return null; 338 } 339 340 return application.getCommandLine().substring(application.getCommandLine().indexOf(' ') + 1); 341 } 342 343 private String getLogName(Job job) { 344 // TODO maybe log filename should be put as a general property of Process (as stds) 345 return "sched" + job.getJobID() + ".condorg.log"; 346 } 347 348 private String getGlobusScheduler() { 349 //TODO make it flexible 350 return getGlobusGatekeeper(); 351 } 352 353 private String gatekeeper; 354 355 /** Holds value of property transferExecutable. */ 356 private boolean transferExecutable; 357 358 public void setGlobusGatekeeper(String gatekeeper) { 359 this.gatekeeper = gatekeeper; 360 } 361 362 public String getGlobusGatekeeper() { 363 return gatekeeper; 364 } 365 366 private String remoteInitialDir; 367 368 public void setRemoteInitialDir(String remoteInitialDir) { 369 this.remoteInitialDir = remoteInitialDir; 370 } 371 372 public String getRemoteInitialDir() { 373 return remoteInitialDir; 374 } 375 376 private String getRemoteDirectory() { 377 // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory 378 if (".".equals(getRemoteInitialDir())) return FilesystemToolkit.getCurrentDirectory(); 379 return getRemoteInitialDir(); 380 } 381 382 protected String getResourceUsageSwitch(Job job) { 383 String res = super.getResourceUsageSwitch(job); 384 if (res == null) return res; 385 386 return res.replaceAll("\"", "\\\\\""); 387 } 388 389 /** Getter for property transferExecutable. 390 * @return Value of property transferExecutable. 391 * 392 */ 393 public boolean isTransferExecutable() { 394 return this.transferExecutable; 395 } 396 397 /** Setter for property transferExecutable. 398 * @param transferExecutable New value of property transferExecutable. 399 * 400 */ 401 public void setTransferExecutable(boolean transferExecutable) { 402 this.transferExecutable = transferExecutable; 403 } 404 405 406 407 public void Kill(Request request, List jobs) { 408 //System.out.println("condor kill"); 409 410 for(int z=0; z != jobs.size(); z++){ 411 Job job = (Job)jobs.get(z); 412 413 if(job.getProcesseIDs().size() == 0){ 414 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 415 jobs.remove(z); 416 z--; 417 } 418 else{ 419 for(int i=0; job.getProcesseIDs().size() != i; i++){ 420 421 int attempt = 0; 422 boolean success = false; 423 String commmandOutput = ""; 424 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 425 426 while (!success && (attempt < getMaxAttempts())) { 427 try { 428 CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 429 task.execute(); 430 if (task.getExitStatus() != 0) { 431 log.warning("condor_rm " + task.getOutput()); 432 Thread.sleep(getMsBtwnFailure()); 433 if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true; 434 System.out.print(task.getOutput()); 435 attempt++; 436 } 437 else{ 438 success = true; 439 System.out.println("Killed"); 440 } 441 442 commmandOutput = task.getOutput(); 443 } 444 catch (Exception e) { System.out.print("condor_rm failed" + e); 445 System.out.print(commmandOutput); 446 } 447 try { Thread.sleep(getMsBtwnFailure());} 448 catch (Exception e1) {System.out.print("condor_rm failed");} 449 if(!success) System.out.print("/"); 450 attempt++; 451 } 452 453 } 454 job.clearProcesseIDs(); 455 jobs.remove(z); 456 z--; 457 } 458 } 459 } 460 461 public String Status(Job job, int Processe) { 462 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 463 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 464 465 466 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 467 468 int attempt = 0; 469 boolean success = false; 470 String commmandOutput = ""; 471 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 472 473 while (!success && (attempt < getMaxAttempts())) { 474 try { 475 CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime()); 476 task.execute(); 477 if (task.getExitStatus() != 0) { 478 log.warning("condor_q " + task.getOutput()); 479 Thread.sleep(getMsBtwnFailure()); 480 481 // if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 482 //return (task.getOutput().replace('\n',' '); 483 attempt++; 484 } 485 else{ 486 success = true; 487 488 489 if(task.getOutput().length() < 217) return("Done or Killed"); 490 else{ 491 String state = task.getOutput().substring(214,216).trim(); 492 if( state.startsWith("R")) state = "RUN"; 493 return(task.getOutput().substring(214,216).trim()); 494 } 495 496 497 } 498 499 commmandOutput = task.getOutput(); 500 } 501 catch (Exception e) { System.out.print("condor_q failed" + e); 502 System.out.print(commmandOutput); 503 } 504 try { Thread.sleep(getMsBtwnFailure());} 505 catch (Exception e1) {System.out.print("condor_q failed");} 506 if(!success) System.out.print("/"); 507 attempt++; 508 } 509 510 // } 511 512 return "condor_q failed"; 513 } 514 515 public void stop() { 516 } 517 518 }